Skip to content

feat: PG-Sync V2 — ops pipeline with BitmapSink dual-path#85

Merged
JustMaier merged 19 commits intomainfrom
feat/sync-v2
Mar 26, 2026
Merged

feat: PG-Sync V2 — ops pipeline with BitmapSink dual-path#85
JustMaier merged 19 commits intomainfrom
feat/sync-v2

Conversation

@JustMaier
Copy link
Copy Markdown
Contributor

Summary

  • Ops-based outbox replacing the V1 enrichment-heavy poller (80M rows behind, can never catch up)
  • Self-contained ops: PG triggers emit field-level deltas (set/remove/add/delete/queryOpSet) — no enrichment queries
  • Two processing paths per design doc:
    • Steady-state: WAL reader → dedup → CoalescerSink → crossbeam channel → flush thread (2.7K ops/s)
    • Dump mode: CSV → parse → AccumSink → BitmapAccum → apply_accum (414K images/s, 931K total ops/s)
  • creates_slot flag on EntityOps — only Image table sets alive, persisted in WAL binary header
  • Validation harness with three modes (direct/WAL/steady-state), tested at 10K/100K/1M

Key Files

File Change
src/ops_processor.rs Rewritten — BitmapSink-based, FieldMeta config awareness, process_csv_dump_direct()
src/ingester.rs CoalescerSink + AccumSink (existing, now used by ops processor)
src/pg_sync/ops.rs EntityOps.creates_slot flag, new()/with_alive() constructors
src/ops_wal.rs WAL format v2: 1-byte flags field for creates_slot
src/pg_sync/op_dedup.rs Preserves creates_slot via OR across merged sources
src/concurrent_engine.rs apply_accum(), mutation_sender()
src/server.rs WAL reader uses CoalescerSink (not engine.put())
docs/design/pg-sync-v2-final.md Updated: pg-sync = data mover, BitDex = all processing
Plus 8 modules from original PR #84 ops, op_dedup, ops_wal, trigger_gen, dump, csv_ops, ops_poller, metrics

Benchmarks (1M scale)

Path Images/s Tags/s Tools/s Total
Direct dump (AccumSink) 414K 2.9M 3.3M 931K ops/s
WAL dump 41K 41K ops/s
Single-pass baseline 345K 345K/s

Direct dump path is 20% faster than the single-pass loader baseline.

Test plan

  • 14 unit tests (ops processor: set/remove/add/delete, sort bit decomposition, filter parsing, cursor persistence)
  • 9 integration tests (WAL roundtrip, dedup, queryOpSet, dump registry, Civitai config)
  • CSV validation PASS at 10K, 100K, 1M (zero errors, alive count exact match)
  • Steady-state path validated at 10K (CoalescerSink → flush thread → alive count match)
  • Full-scale 107M validation (requires ~15min run)
  • Rebase verification: all existing tests pass

🤖 Generated with Claude Code

JustMaier and others added 19 commits March 25, 2026 19:50
Core building blocks for the ops-based sync pipeline:

- src/pg_sync/ops.rs: Op enum (Set, Remove, Add, Delete, QueryOpSet),
  OpsRow, OpsBatch, EntityOps, SyncMeta, BitdexOps table SQL
- src/pg_sync/op_dedup.rs: Shared dedup helper — LIFO per (entity_id, field),
  add/remove cancellation, delete absorption, queryOpSet last-wins
- src/ops_wal.rs: Append-only WAL with CRC32 integrity, WalWriter (append+fsync),
  WalReader (cursor-based tail, partial record handling, CRC skip)

Also fixes pre-existing compile error in copy_queries.rs tests (missing
width/height fields on CopyImageRow constructors).

30 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WAL-backed ops ingestion endpoint:
- POST /api/indexes/{name}/ops accepts OpsBatch (ops + sync meta)
- Appends to WAL file via WalWriter, returns 200 only after fsync
- Lazy WAL writer init (created on first POST)
- Stores latest SyncMeta per source for lag monitoring

Sync lag endpoint:
- GET /api/internal/sync-lag returns latest metadata from all sync sources
- Supports cursor position, max_id, lag_rows per source

Both endpoints compile-gated behind pg-sync feature with no-op fallbacks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Ops processor that reads from WAL and routes to the engine:
- Regular ops (set/remove/add): build PatchPayload with old+new values,
  call engine.patch() — no docstore read needed
- queryOpSet: parse filter string, execute query for matching slots,
  apply nested ops to all matches
- Delete: route to engine.delete()
- Filter parser for queryOpSet: supports eq and in operators

Includes json_to_qvalue converter (serde_json::Value → query::Value)
for the PatchPayload/FieldValue type boundary.

9 tests: scalar update, insert (no old), multi-value add/remove,
delete+queryOpSet skip, filter parsing, value type parsing, cursor
persistence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dpoint

New ops_poller.rs replaces outbox_poller for V2 sync:
- Reads from BitdexOps table (JSONB ops arrays) instead of BitdexOutbox
- Cursor managed in PG bitdex_cursors table (not in BitDex)
- Deduplicates via shared dedup_ops() before sending
- POSTs OpsBatch with SyncMeta (cursor, max_id, lag) to /ops endpoint
- Health gate: pauses when BitDex is unreachable

Also adds post_ops() to BitdexClient.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Config-driven PG trigger generation:
- SyncSource struct: direct tables (slot_field + track_fields),
  multi-value join tables (field + value_field), fan-out tables
  (query + query_source)
- SyncConfig: YAML-parseable config with sync_sources array
- SQL generator: CREATE OR REPLACE FUNCTION + CREATE TRIGGER for each source
- Expression interpolation in track_fields: "GREATEST({scannedAt}, {createdAt}) as existedAt"
- {column} placeholder substitution with OLD/NEW prefixes
- Hash-based trigger naming (bitdex_{table}_{hash8}) for reconciliation
- IS DISTINCT FROM checks for UPDATE ops (only emit when value actually changes)
- queryOpSet generation for fan-out tables
- ENABLE ALWAYS on all triggers (CDC compatibility)

11 tests: parsing, column substitution, all three trigger types,
hash change detection, YAML parsing, expression interpolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dump lifecycle management for unified load pipeline:
- DumpRegistry: in-memory + JSON-persisted dump state tracking
- DumpEntry: name, wal_path, status (Writing/Loading/Complete/Failed),
  ops counts, timestamps
- dump_name() + config_hash() for change detection
- Atomic save via temp file rename

Server endpoints:
- GET /dumps — list all dumps with status
- PUT /dumps — register new dump
- POST /dumps/{name}/loaded — signal WAL file complete
- DELETE /dumps/{name} — remove from history
- POST /dumps/clear — clear all

All endpoints feature-gated behind pg-sync with no-op fallbacks.
10 tests: lifecycle, persistence, removal, completion tracking,
config hash determinism, failure handling.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New unified sync metrics with source label:
- bitdex_sync_cursor_position{source="..."} — current cursor
- bitdex_sync_max_id{source="..."} — max ops table ID
- bitdex_sync_lag_rows{source="..."} — rows behind
- bitdex_sync_ops_total{source="..."} — total ops received
- bitdex_sync_wal_bytes{source="..."} — WAL file size

Metrics populated from SyncMeta in the POST /ops endpoint.
Old bitdex_pgsync_* metrics preserved for backward compat.
Binary rename (bitdex-pg-sync → bitdex-sync) deferred to deployment PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
9 integration tests exercising the full ops pipeline:
- WAL roundtrip with dedup (write → read → dedup → verify)
- Delete absorption through WAL
- Add/remove cancellation through WAL
- queryOpSet serialization through WAL
- Cursor resume across multiple appends
- Dump registry full workflow (register → load → complete → persist)
- Dump config change detection (hash mismatch triggers re-dump)
- Full Civitai trigger config (6 sources, all generate valid SQL)
- OpsBatch JSON format roundtrip with SyncMeta

Total: 69 tests across all Sync V2 modules (60 unit + 9 integration).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Spawns a background thread that tails the ops WAL file, reads batches
of up to 10K records, deduplicates via dedup_ops(), and applies
mutations to the engine via apply_ops_batch(). Persists cursor to
disk after each batch. Updates bitdex_sync_wal_bytes metric.

This completes the full ops ingestion chain:
POST /ops → WAL append + fsync → WAL reader thread → engine mutations

The reader sleeps 50ms when no new records are available, and 1s when
no index is loaded yet. Errors are logged and retried.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CSV dump adapter (csv_ops.rs):
- images_csv_to_wal(): parses images.csv, converts each row to set ops
  (nsfwLevel, type, userId, postId, hasMeta, onSite, minor, poi, existedAt, blockedFor)
- tags_csv_to_wal(), tools_csv_to_wal(): multi-value CSV → add ops
- run_csv_dump(): orchestrates full CSV dump with optional row limits
- Supports batch writing to WAL with configurable batch size
- Limited variants for validation testing with subsets

WAL reader thread (server.rs):
- Spawned on server startup, tails ops.wal, reads batches of 10K
- Deduplicates and applies via apply_ops_batch()
- Persists cursor to disk, updates WAL bytes metric
- Completes the full chain: POST /ops → WAL → reader → engine

2 new tests + previous tests still passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites ops_processor.rs per the Sync V2 design doc:

Steady-state path: ops → BitmapSink (CoalescerSink) → coalescer channel.
No more engine.put() — ops translate directly to FilterInsert/FilterRemove/
SortSet/SortClear/AliveInsert/AliveRemove via the existing mutation helpers
(value_to_bitmap_key, value_to_sort_u32).

Dump path: ops → BitmapSink (AccumSink) → BitmapAccum → apply_accum().
Bypasses coalescer, snapshot publishing, and cache invalidation entirely.
process_csv_dump_direct() goes CSV → ops → AccumSink in one pass.

Key changes:
- FieldMeta: precomputed field metadata from Config (filter/sort field types)
- creates_slot flag on EntityOps (persisted in WAL binary header)
- apply_accum() on ConcurrentEngine for direct staging merge
- mutation_sender() exposed on ConcurrentEngine for CoalescerSink
- WAL format updated: 1-byte flags field after entity_id
- Dedup preserves creates_slot via OR across merged sources
- Validation harness supports --direct, --steady-state, WAL dump modes

Benchmarks at 1M scale:
- Direct dump: 367K images/s (beats 345K/s single-pass baseline)
- WAL dump: 41K ops/s
- Steady-state: 2.7K ops/s (expected — per-op channel overhead)

14 unit tests + 9 integration tests passing.
CSV validation PASS at 10K, 100K, 1M with zero errors.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rocessing

Reflects the architecture split validated by benchmarks:
- pg-sync: thin data mover (COPY→CSV, cursor management, ops polling)
- BitDex: all processing (CSV parse, ops→AccumSink, bitmap accumulation)
- Dumps skip WAL entirely — CSV→AccumSink direct path at 367K/s
- Updated throughput table with measured numbers
- Boot sequence uses pre_dump_cursor for gap safety

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Uses rayon fold+reduce for parallel CSV parsing, matching the
single-pass loader pattern. Each rayon worker builds a thread-local
BitmapAccum, merged at the end via bitmap OR.

1M benchmark: 2.7M ops/s total (images 2.0M/s, tags 5.4M/s, tools 8.8M/s).
Previous single-threaded: 931K ops/s. Speedup: 2.9x.
vs single-pass baseline (345K/s): 5.8x faster on images.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…apply_accum

F1 (Critical): WAL reader was reading entire file every poll via fs::read().
Now uses seek(cursor) + read_exact() for incremental reads. O(new_data) per
poll instead of O(file_size).

F2 (Critical): apply_accum() cloned snapshot without loading mode, triggering
Arc clone cascade (94s stalls at 105M). Now enters/exits loading mode
automatically — staging refcount=1, no deep clones.

Also: chunked block reader for direct dump path (300MB blocks via reader thread).
Prevents OOM on 67GB tags.csv at full scale.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CH poller now emits Op::Set for reactionCount/commentCount/collectedCount
instead of V1 full-document patches. Removes PG dependency (metrics are
self-contained sort values from ClickHouse). Batched at 5K entities per
POST /ops request. creates_slot: false (sort-only, no alive bit changes).

6 unit tests for metrics→ops conversion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Direct mode reads CSVs directly — no need to write WAL first.
Eliminates ~40s wasted I/O per run.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ader hang

Reverts to simple read_lines approach (no chunked block reader thread).
The chunked reader deadlocked due to sync_channel backpressure with rayon.

Loading mode entered once for the entire dump, exited after all tables.
Headless engines get a harmless timeout warning (no flush thread).

1M benchmark: 2.5M ops/s (images 1.35M/s, tags 5.1M/s, tools 6.8M/s).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reads 10M lines per chunk instead of entire file. Each chunk is rayon
processed and applied to staging, then freed. Caps memory at ~2GB per
chunk instead of 67GB for the full tags file.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JustMaier JustMaier merged commit 316f6bc into main Mar 26, 2026
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant